package com.stronglink.esm27.datasync;

import java.io.IOException;
import java.io.PrintWriter;
import java.sql.SQLException;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

import javax.servlet.http.HttpServletRequest;
import javax.servlet.http.HttpServletResponse;

import org.apache.log4j.LogManager;
import org.apache.log4j.Logger;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.CrossOrigin;
import org.springframework.web.bind.annotation.RequestBody;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RequestMethod;
import org.springframework.web.bind.annotation.RequestParam;
import org.springframework.web.bind.annotation.ResponseBody;
import org.springframework.web.bind.annotation.RestController;

import com.alibaba.fastjson.JSON;
import com.google.common.base.Preconditions;
import com.stronglink.esm27.datasync.entity.ResultMessage;
import com.stronglink.esm27.datasync.entity.ResultTableEntity;
import com.stronglink.esm27.datasync.entity.SyncDataEntity;
import com.stronglink.esm27.datasync.entity.SyncTableEntity;
import com.stronglink.esm27.datasync.entity.SyncTableStructureEntity;
import com.stronglink.esm27.datasync.exception.SyncExistedException;
import com.stronglink.esm27.datasync.handle.TableCloneHandle;
import com.stronglink.esm27.datasync.mq.ActiveMqClient;
import com.stronglink.esm27.datasync.service.IDataSyncService;

@CrossOrigin
@RestController
public class DataSyncController {
	Logger logger = LogManager.getLogger(DataSyncController.class);
	ExecutorService threadPool = Executors.newFixedThreadPool(3);
	Map<String, String> syncDevice = new ConcurrentHashMap<>(); // 同步中的设备信息
	@Autowired
	private IDataSyncService dataSyncService;
	@Autowired
	ActiveMqClient mqClient;

	/**
	 * 克隆指定源数据库的表数据到目标数据库的表中
	 * 
	 * @param srcDataSourceParams
	 * @param destDataSourceParams
	 * @param cloneTables
	 * @param request
	 * @param response
	 * @throws IOException
	 */
	@ResponseBody
	@RequestMapping(value = "/tableClone", method = { RequestMethod.GET, RequestMethod.POST })
	public void tableClone(@RequestParam("srcDataSourceParams") String srcDataSourceParams,
			@RequestParam("destDataSourceParams") String destDataSourceParams,
			@RequestParam("cloneTables") String cloneTables, HttpServletRequest request, HttpServletResponse response)
			throws IOException {
		ResultMessage resultMessage = new ResultMessage();
		try {
			// 将拷贝表任务放到线程池中执行
			threadPool.execute(new TableCloneHandle(dataSyncService, mqClient, srcDataSourceParams,
					destDataSourceParams, cloneTables.split(",")));
		} catch (SyncExistedException e) {
			resultMessage.setCode(-1);
			resultMessage.setMessage("已有任务正在同步，请稍后再试.");
		} catch (Exception e) {
			resultMessage.setCode(-1);
			resultMessage.setMessage(e.getMessage());
		}

		String callback = request.getParameter("jsonpcallback");
		response.setContentType("text/html");
		response.setCharacterEncoding("utf-8");
		PrintWriter out = response.getWriter();
		out.print(callback + "(" + JSON.toJSONString(resultMessage) + ")");
	}

	@ResponseBody
	@RequestMapping(value = "/dataSync", method = RequestMethod.POST)
	public ResultMessage dataSync(@RequestBody SyncDataEntity sd) throws SQLException {
		ResultMessage resultMessage = new ResultMessage();
		try {
//	    String srcDataSurceParams = "jdbc:mysql://localhost:3306/esmsync_test?user=root&password=123a123b&characterEncoding=utf8&serverTimezone=UTC&zeroDateTimeBehavior=convertToNull&allowMultiQueries=true&serverTimezone=PRC&useSSL=false";
//	    String destDataSurecParams = "jdbc:mysql://www.yuzhantao.com:3306/esmsync_test?user=root&password=123a123b&characterEncoding=utf8&serverTimezone=UTC&zeroDateTimeBehavior=convertToNull&allowMultiQueries=true&serverTimezone=PRC&useSSL=false";
//	    String syncColumnName = "sync_time";
			Preconditions.checkArgument(!syncDevice.containsKey(sd.getToken()), "数据正在同步中，请误重复操作。");
			Preconditions.checkNotNull(sd.getToken(), "token不能为空");
			threadPool.execute(new Runnable() {

				/**
				 * 开启线程处理同步信息
				 */
				@Override
				public void run() {
					syncDevice.put(sd.getToken(), sd.getToken());
					try {
						String srcDataSurceParams = sd.getSrcDataSourceParams(); // 获取源数据源参数
						String destDataSurecParams = sd.getDestDataSourceParams(); // 获取目标数据源参数
						SyncTableEntity[] syncTables = sd.getSyncTables(); // 获取需要同步的表规格

						Preconditions.checkNotNull(syncTables, "需要同步的表不能为空");

						// 获取同步的源和目标数据库中相同表的结构
						SyncTableStructureEntity syncTableStructureEntity = dataSyncService
								.getSyncTableStructure(srcDataSurceParams, destDataSurecParams, syncTables);
						// 获取源数据库表结构
						Map<String, Set<String>> srcTableStructure = syncTableStructureEntity.getSrcTableStructure();
						// 遍历同步表数据
						for (int i = 0; i < syncTables.length; i++) {
							SyncTableEntity syncTable = syncTables[i];
							if (srcTableStructure.containsKey(syncTable.getTableName())) {
								String tableName = syncTable.getTableName();
								Set<String> tableValue = srcTableStructure.get(syncTable.getTableName());
								ResultTableEntity resultMsg = new ResultTableEntity(sd.getToken(), tableName, "同步开始",
										"starting");
								mqClient.send("esm27.sync." + sd.getToken(), JSON.toJSONString(resultMsg));
								try {
//									Thread.sleep(2000);
									dataSyncService.syncTable(sd, tableName, tableName, tableValue,
											syncTableStructureEntity.getSyncTableParams().get(tableName));
									resultMsg = new ResultTableEntity(sd.getToken(), tableName, "同步完成", "complete");
								} catch (Exception e) {
									logger.error("", e);
									resultMsg = new ResultTableEntity(sd.getToken(), tableName, e.getMessage(),
											"error");
								}
								mqClient.send("esm27.sync." + sd.getToken(), JSON.toJSONString(resultMsg));
							}
						}
						ResultTableEntity resultMsg = new ResultTableEntity(sd.getToken(), "all", "同步所有表完成",
								"all_complete");
						mqClient.send("esm27.sync." + sd.getToken(), JSON.toJSONString(resultMsg));
					} catch (Exception e) {
						ResultTableEntity resultMsg = new ResultTableEntity(sd.getToken(), "", e.getMessage(), "error");
						mqClient.send("esm27.sync." + sd.getToken(), JSON.toJSONString(resultMsg));
					} finally {
						syncDevice.remove(sd.getToken());
					}
				}
			});
			resultMessage.setCode(1);
			resultMessage.setMessage("同步数据启动成功");

		} catch (Exception e) {
			logger.error("", e);
			resultMessage.setCode(0);
			resultMessage.setMessage(e.getMessage());
		}
		return resultMessage;
	}
}
